8.2 通道

相比Erlang,Go并未实现严格的并发安全。

允许全局变量、指针、引用类型这些非安全内存共享操作,就需要开发人员自行维护数据一致和完整性。Go鼓励使用CSP通道,以通信来代替内存共享,实现并发安全。

Don’t communicate by sharing memory,share memory by communicating.

CSP:Communicating Sequential Process.

通过消息来避免竞态的模型除了CSP,还有Actor。但两者有较大区别。

作为CSP核心,通道(channel)是显式的,要求操作双方必须知道数据类型和具体通道,并不关心另一端操作者身份和数量。可如果另一端未准备妥当,或消息未能及时处理时,会阻塞当前端。

相比起来,Actor是透明的,它不在乎数据类型及通道,只要知道接收者信箱即可。默认就是异步方式,发送方对消息是否被接收和处理并不关心。

从底层实现上来说,通道只是一个队列。同步模式下,发送和接收双方配对,然后直接复制数据给对方。如配对失败,则置入等待队列,直到另一方出现后才被唤醒。异步模式抢夺的则是数据缓冲槽。发送方要求有空槽可供写入,而接收方则要求有缓冲数据可读。需求不符时,同样加入等待队列,直到有另一方写入数据或腾出空槽后被唤醒。

除传递消息(数据)外,通道还常被用作事件通知。

func main() { done:=make(chan struct{}) // 结束事件 c:=make(chan string) // 数据传输通道

go func() { s:= c // 接收消息 println(s)
close(done) // 关闭通道,作为结束通知 }()

c “hi!” // 发送消息 done // 阻塞,直到有数据或管道关闭 }

输出:

hi!

同步模式必须有配对操作的goroutine出现,否则会一直阻塞。而异步模式在缓冲区未满或数据未读完前,不会阻塞。

func main() { c:=make(chan int,3) // 创建带3个缓冲槽的异步通道

c1 // 缓冲区未满,不会阻塞 c2

println(c) // 缓冲区尚有数据,不会阻塞 println(c) }

输出:

1 2

多数时候,异步通道有助于提升性能,减少排队阻塞。

缓冲区大小仅是内部属性,不属于类型组成部分。另外通道变量本身就是指针,可用相等操作符判断是否为同一对象或nil。

func main() { var a,b chan int=make(chan int,3),make(chan int) var c chan bool

println(ab) println(cnil)

fmt.Printf(“%p, %d\n”,a,unsafe.Sizeof(a)) }

输出:

false true 0xc820076000,8

虽然可传递指针来避免数据复制,但须额外注意数据并发安全。

内置函数cap和len返回缓冲区大小和当前已缓冲数量;而对于同步通道则都返回0,据此可判断通道是同步还是异步。

func main() { a,b:=make(chan int),make(chan int,3)

b1 b2

println(“a:“,len(a),cap(a)) println(“b:“,len(b),cap(b)) }

输出:

a:0 0 b:2 3

收发

除使用简单的发送和接收操作符外,还可用ok-idom或range模式处理数据。

func main() { done:=make(chan struct{}) c:=make(chan int)

go func() { defer close(done) // 确保发出结束通知

   for{ 
       x,ok:= <-c
       if!ok{                     // 据此判断通道是否被关闭 
           return
        } 

       println(x) 
    } 
}() 

c1 c2 c3 close(c)

<-done

}

输出:

1 2 3

对于循环接收数据,range模式更简洁一些。

func main() { done:=make(chan struct{}) c:=make(chan int)

go func() { defer close(done)

   for x:=range c{         // 循环获取消息,直到通道被关闭 
       println(x) 
    } 

}()

c1 c2 c3 close(c)

<-done

}

及时用close函数关闭通道引发结束通知,否则可能会导致死锁。

fatal error:all goroutines are asleep-deadlock!

通知可以是群体性的。也未必就是通知结束,可以是任何需要表达的事件。

func main() { var wg sync.WaitGroup ready:=make(chan struct{})

for i:=0;i<3;i++ { wg.Add(1)

   go func(id int) { 
       defer wg.Done() 

       println(id, ":ready.")       // 运动员准备就绪 
        <-ready                       // 等待发令 
       println(id, ":running...") 
    }(i) 
} 

time.Sleep(time.Second) println(“Ready?Go!”)

close(ready) // 砰!

wg.Wait() }

输出:

0:ready. 2:ready. 1:ready.

Ready?Go!

1:running… 0:running… 2:running…

一次性事件用close效率更好,没有多余开销。连续或多样性事件,可传递不同数据标志实现。还可使用sync.Cond实现单播或广播事件。

对于closed或nil通道,发送和接收操作都有相应规则:

  • 向已关闭通道发送数据,引发panic。
  • 从已关闭接收数据,返回已缓冲数据或零值。
  • 无论收发,nil通道都会阻塞。

func main() { c:=make(chan int,3)

c10 c20 close(c)

for i:=0;i<cap(c)+1;i++ { x,ok:= c println(i, ”:“,ok,x) } }

输出:

0:true 10 1:true 20 2:false 0 3:false 0

重复关闭,或关闭nil通道都会引发panic错误。

panic:close of closed channel panic:close of nil channel

单向

通道默认是双向的,并不区分发送和接收端。但某些时候,我们可限制收发操作的方向来获得更严谨的操作逻辑。

尽管可用make创建单向通道,但那没有任何意义。通常使用类型转换来获取单向通道,并分别赋予操作双方。

func main() { var wg sync.WaitGroup wg.Add(2)

c:=make(chan int) var send chanint=c var recvchan int=c

go func() { defer wg.Done()

   for x:=range recv{ 
       println(x) 
    } 
}() 

go func() { defer wg.Done() defer close(c)

   for i:=0;i<3;i++ { 
       send<-i
    } 
}() 

wg.Wait() }

不能在单向通道上做逆向操作。

func main() { c:=make(chan int,2)

var send chanint=c var recvchan int=c

<-send               // 无效操作: <-send(receive from send-only type chan<-int) 

recv1 // 无效操作:recv1(send to receive-only typechan int) }

同样,close不能用于接收端。

func main() { c:=make(chan int,2) var recvchan int=c

close(recv) // 无效操作:close(recv) (cannot close receive-only channel) }

无法将单向通道重新转换回去。

func main() { var a,b chan int

a=make(chan int,2) var recvchan int=a var send chanint=a

b= (chan int)(recv) // 错误:cannot convert recv(typechan int)to type chan int b= (chan int)(send) // 错误:cannot convert send(type chanint)to type chan int }

选择

如要同时处理多个通道,可选用select语句。它会随机选择一个可用通道做收发操作。

func main() { var wg sync.WaitGroup wg.Add(2)

a,b:=make(chan int),make(chan int)

go func() { // 接收端 defer wg.Done()

   for{ 
       var( 
           name string
           x  int
           ok bool
        ) 

       select{            // 随机选择可用channel接收数据 
       case x,ok= <-a: 
           name= "a" 
       case x,ok= <-b: 
           name= "b" 
        } 

       if!ok{            // 如果任一通道关闭,则终止接收 
           return
        } 

       println(name,x)     // 输出接收的数据信息 
    } 
}() 

go func() { // 发送端 defer wg.Done() defer close(a) defer close(b)

   for i:=0;i<10;i++ { 
       select{            // 随机选择发送channel
       case a<-i: 
       case b<-i*10: 
        } 
    } 
}() 

wg.Wait() }

输出:

b 0 a 1 a 2 b 30 a 4 a 5 b 60 b 70 a 8 b 90

如要等全部通道消息处理结束(closed),可将已完成通道设置为nil。这样它就会被阻塞,不再被select选中。

func main() { var wg sync.WaitGroup wg.Add(3)

a,b:=make(chan int),make(chan int)

go func() { // 接收端 defer wg.Done()

   for{ 
       select{ 
       case x,ok:= <-a: 
           if!ok{         // 如果通道关闭,则设置为nil,阻塞 
               a=nil
               break
            } 

           println("a",x) 
       case x,ok:= <-b: 
           if!ok{ 
               b=nil
               break
            } 

           println("b",x) 
        } 

       if a==nil&&b==nil{     // 全部结束,退出循环 
           return
        } 
    } 
}() 

go func() { // 发送端a defer wg.Done() defer close(a)

   for i:=0;i<3;i++ { 
       a<-i
    } 
}() 

go func() { // 发送端b defer wg.Done() defer close(b)

   for i:=0;i<5;i++ { 
       b<-i*10
    } 
}() 

wg.Wait() }

输出:

b 0 b 10 b 20 b 30 b 40 a 0 a 1 a 2

即便是同一通道,也会随机选择case执行。

func main() { var wg sync.WaitGroup wg.Add(2)

c:=make(chan int)

go func() { // 接收端 defer wg.Done()

   for{ 
       var v int
       var ok bool

       select{                      // 随机选择case
       case v,ok= <-c: 
           println("a1:",v) 
       case v,ok= <-c: 
           println("a2:",v) 
        } 

       if!ok{ 
           return
        } 
    } 
}() 

go func() { // 发送端 defer wg.Done() defer close(c)

   for i:=0;i<10;i++ { 
       select{                      // 随机选择case
       case c<-i: 
       case c<-i*10: 
        } 
    } 
}() 

wg.Wait() }

输出:

a1:0 a2:10 a2:2 a1:30 a1:40 a2:50 a2:60 a2:7 a1:8 a1:90 a1:0

当所有通道都不可用时,select会执行default语句。如此可避开select阻塞,但须注意处理外层循环,以免陷入空耗。

func main() { done:=make(chan struct{}) c:=make(chan int)

go func() { defer close(done)

   for{ 
      select{ 
       case x,ok:= <-c: 
           if!ok{ 
               return
            } 

           fmt.Println("data:",x) 
       default:                // 避免select阻塞 
        } 

       fmt.Println(time.Now()) 
       time.Sleep(time.Second) 
    } 
}() 

time.Sleep(time.Second*5)

c100 close(c)

<-done

}

输出:

2016-04-01 17:22:07 2016-04-01 17:22:08 2016-04-01 17:22:09 2016-04-01 17:22:10 2016-04-01 17:22:11 data:100 2016-04-01 17:22:12

也可用default处理一些默认逻辑。

func main() { done:=make(chan struct{})

data:= []chan int{ // 数据缓冲区 make(chan int,3), }

go func() { defer close(done)

   for i:=0;i<10;i++ { 
       select{ 
       case data[len(data)-1] <-i:          // 生产数据 
       default:                    // 当前通道已满,生成新的缓存通道 
           data=append(data,make(chan int,3)) 
        } 
    } 
}() 

<-done

for i:=0;i<len(data);i++ { // 显示所有数据 c:=data[i] close(c)

   for x:=range c{ 
       println(x) 
    } 
} 

}

模式

通常使用工厂方法将goroutine和通道绑定。

type receiver struct{ sync.WaitGroup data chan int }

func newReceiver() *receiver{ r:= &receiver{ data:make(chan int), }

r.Add(1) go func() { defer r.Done() for x:=range r.data{ // 接收消息,直到通道被关闭 println(“recv:“,x) } }()

return r }

func main() { r:=newReceiver() r.data1 r.data2

close(r.data) // 关闭通道,发出结束通知 r.Wait() // 等待接收者处理结束 }

输出:

recv:1 recv:2

鉴于通道本身就是一个并发安全的队列,可用作ID generator、Pool等用途。

type pool chan[]byte

func newPool(cap int)pool{ return make(chan[]byte,cap) }

func(p pool)get() []byte{ var v[]byte

select{ case v= p: // 返回 default: // 返回失败,新建 v=make([]byte,10) }

return v }

func(p pool)put(b[]byte) { select{ case pb: // 放回 default: // 放回失败,放弃 } }

用通道实现信号量(semaphore)。

func main() { runtime.GOMAXPROCS(4) var wg sync.WaitGroup

sem:=make(chan struct{},2) // 最多允许2个并发同时执行

for i:=0;i<5;i++ { wg.Add(1)

   go func(id int) { 
       defer wg.Done() 

       sem<-struct{}{}               //acquire: 获取信号 
       defer func() { <-sem}()      //release: 释放信号 

       time.Sleep(time.Second*2) 
       fmt.Println(id,time.Now()) 
    }(i) 
} 

wg.Wait() }

输出:

4 2016-02-19 18:24:09 0 2016-02-19 18:24:09

2 2016-02-19 18:24:11 1 2016-02-19 18:24:11

3 2016-02-19 18:24:13

标准库time提供了timeout和tick channel实现。

func main() { go func() { for{ select{ casetime.After(time.Second*5): fmt.Println(“timeout…”) os.Exit(0) } } }()

go func() { tick:=time.Tick(time.Second)

   for{ 
       select{ 
       case<-tick: 
           fmt.Println(time.Now()) 
        } 
    } 
}() 

<-(chan struct{})(nil)        // 直接用nil channel阻塞进程 

}

捕获INT、TERM信号,顺便实现一个简易的atexit函数。

import( “os” “os/signal” “sync” “syscall” )

var exits= &struct{ sync.RWMutex funcs []func() signals chan os.Signal }{}

func atexit(f func()) { exits.Lock() defer exits.Unlock() exits.funcs=append(exits.funcs,f) }

func waitExit() { if exits.signals==nil{ exits.signals=make(chan os.Signal) signal.Notify(exits.signals,syscall.SIGINT,syscall.SIGTERM) }

exits.RLock() for_,f:=range exits.funcs{ defer f() // 即便某些函数panic,延迟调用也能确保后续函数执行 } // 延迟调用按FILO顺序执行 exits.RUnlock()

<-exits.signals

}

func main() { atexit(func() {println(“exit1…“) }) atexit(func() {println(“exit2…“) })

waitExit() }

性能

将发往通道的数据打包,减少传输次数,可有效提升性能。从实现上来说,通道队列依旧使用锁同步机制,单次获取更多数据(批处理),可改善因频繁加锁造成的性能问题。

写个例子测试一下(代码中已尽可能规避额外开销)。

const( max =50000000 // 数据统计上限 block =500 // 数据块大小 bufsize=100 // 缓冲区大小 )

func test() { // 普通模式: 每次传递一个整数 done:=make(chan struct{}) c:=make(chan int,bufsize)

go func() { count:=0 for x:=range c{ count+=x }

   close(done) 
}() 

for i:=0;i<max;i++ { ci }

close(c) done

}

func testBlock() { // 块模式: 每次将500个数字打包成块传输 done:=make(chan struct{}) c:=make(chan[block]int,bufsize)

go func() { count:=0 for a:=range c{ for_,x:=range a{ count+=x } }

   close(done) 
}() 

for i:=0;i<max;i+=block{ var b[block]int // 使用数组对数据打包 for n:=0;n<block;n++ { b[n] =i+n if i+n==max-1{ break } }

   c<-b
} 

close(c) done }

输出:

BenchmarkTest-4 1 4299047783 ns/op 3296 B/op 8 allocs/op BenchmarkTestBlock-4 10 122825583 ns/op 401516 B/op 2 allocs/op

虽然单次消耗更多内存,但性能提升非常明显。如将数组改成切片会造成更多内存分配次数。

资源泄漏

通道可能会引发goroutine leak,确切地说,是指goroutine处于发送或接收阻塞状态,但一直未被唤醒。垃圾回收器并不收集此类资源,导致它们会在等待队列里长久休眠,形成资源泄漏。

func test() { c:=make(chan int)

for i:=0;i<10;i++ { go func() { c }() } }

func main() { test()

for{ time.Sleep(time.Second) runtime.GC() // 强制垃圾回收 } }

输出:

GODEBUG=“gctrace=1,schedtrace=1000,scheddetail=1” ./test

gc 33@33.112s 0%:0.019+0+0.22 ms clock,0.078+0/0/0+0.90 ms cpu,000 MB,0 MB goal,4 P(forced) SCHED 33204ms:gomaxprocs=4 idleprocs=4 threads=6 spinningthreads=0 idlethreads=4 runqueue=0 gcwaiting=0 nmidlelocked=0… P0:status=0 schedtick=2 syscalltick=33 m=-1 runqsize=0 gfreecnt=0 P1:status=0 schedtick=10 syscalltick=32 m=-1 runqsize=0 gfreecnt=0 P2:status=0 schedtick=1 syscalltick=2 m=-1 runqsize=0 gfreecnt=0 P3:status=0 schedtick=0 syscalltick=0 m=-1 runqsize=0 gfreecnt=0 M5:p=-1 curg=-1 mallocing=0 throwing=0 preemptoff=locks=0 dying=0 helpgc=0 spinning=false blocked=false lockedg=-1 M4:p=-1 curg=-1 mallocing=0 throwing=0 preemptoff=locks=0 dying=0 helpgc=0 spinning=false blocked=false lockedg=-1 M3:p=-1 curg=-1 mallocing=0 throwing=0 preemptoff=locks=0 dying=0 helpgc=0 spinning=false blocked=false lockedg=-1 M2:p=-1 curg=-1 mallocing=0 throwing=0 preemptoff=locks=0 dying=0 helpgc=0 spinning=false blocked=false lockedg=-1 M1:p=-1 curg=-1 mallocing=0 throwing=0 preemptoff=locks=1 dying=0 helpgc=0 spinning=false blocked=false lockedg=-1 M0:p=-1 curg=14 mallocing=0 throwing=0 preemptoff=locks=0 dying=0 helpgc=0 spinning=false blocked=false lockedg=-1 G1:status=4(sleep)m=-1 lockedm=-1 G2:status=4(force gc(idle))m=-1 lockedm=-1 G3:status=4(GC sweep wait)m=-1 lockedm=-1 G4:status=4(chan receive)m=-1 lockedm=-1 G5:status=4(chan receive)m=-1 lockedm=-1 G6:status=4(chan receive)m=-1 lockedm=-1 G7:status=4(chan receive)m=-1 lockedm=-1 G8:status=4(chan receive)m=-1 lockedm=-1 G9:status=4(chan receive)m=-1 lockedm=-1 G10:status=4(chan receive)m=-1 lockedm=-1 G11:status=4(chan receive)m=-1 lockedm=-1 G12:status=4(chan receive)m=-1 lockedm=-1 G13:status=4(chan receive)m=-1 lockedm=-1 G14:status=3(timer goroutine(idle))m=0 lockedm=-1

从监控结果可以看到大量goroutine一直处于chan receive状态,无法结束。